Extensionmethods

Not all operators are loaded at import of rx.


In [1]:
# Example: from_marbles
import rx
try:
    rx.Observable.from_marbles('a-b|')
except Exception as ex:
    print 'error:', ex  # shown only after ipython notebook kernel restart


error: type object 'Observable' has no attribute 'from_marbles'

In [2]:
# -> to see whats there don't use e.g. `dir(Observable)` but find
# 'def from_marbles' in the rx directory, to see the module,
# then import it:
'''
~/GitHub/RxPY/rx $ ack 'def from_marbl'
testing/marbles.py
90:def from_marbles(self, scheduler=None):
'''
import rx.testing.marbles
def show(x): print (x)
stream = rx.Observable.from_marbles('a-b--c|').to_blocking().subscribe(show)


a
b
c

Async Operations

It is useful to understand on a high level how RxPY handles asyncronity and when. E.g. naively you might want to know, when notifying a value to a subscriber, what other subscribers are present.
This makes no sense to ask (I think in general in reactive programming) and it will be clear looking at an example.

Consider timing and thread outputs in the following:


In [38]:
# =============================
# change these (both in millis)
delay_stream, slow_emit = 0, 0
# =============================

import rx, threading, random, time
thread = threading.currentThread


def call_observer(obs):
    '''observer functions are invoked, blocking'''
    print_out(obs.__class__.__name__, hash(obs))
    for i in range(2):
        obs.on_next(1)
        if slow_emit:
            time.sleep(slow_emit/1000)
        obs.on_next(1)
        
stream = rx.Observable.create(call_observer).take(10)
if delay_stream:
    stream = stream.delay(delay_stream)

def print_out(*v):
    '''printout of current time, v, and current thread'''
    v_pretty = ' '.join([str(s) for s in v])
    print ('%.8f - %30s - %s\n' % (time.time(), v_pretty, thread().getName()))
    
    
d = stream.subscribe(lambda x: print_out('Observer 1', x))
d = stream.subscribe(lambda x: print_out('Observer 2', x))


1482356523.29513192 -   AutoDetachObserver 275507917 - MainThread

1482356523.29556108 -                   Observer 1 1 - MainThread

1482356523.29563403 -                   Observer 1 1 - MainThread

1482356523.29568505 -                   Observer 1 1 - MainThread

1482356523.29573011 -                   Observer 1 1 - MainThread

1482356523.29649496 -   AutoDetachObserver 275507817 - MainThread

1482356523.29657793 -                   Observer 2 1 - MainThread

1482356523.29663706 -                   Observer 2 1 - MainThread

1482356523.29668999 -                   Observer 2 1 - MainThread

1482356523.29674101 -                   Observer 2 1 - MainThread

  • As long as there is no time related stream operator involved, then RXPy does everything syncrononous.
  • RXPy goes async only when it has to, according to the nature of the async operation declared by the user.
  • It defaults to reasonable mechanics, e.g. using threading.
  • You can overwrite these defaults, by picking a "scheduler" (e.g. gevent, e.g. twisted, e.g. futures)

=> In the call_observer function you can't know about the concurrency situation It soleley depends on the design of the stream operations applied. See .ref_count() though, for published streams

Check the .observe_on example to get a deeper understanding how scheduling works.


In [ ]: